/*
 * Ruby library that wraps the Sleepycat Berkeley DB.
 * 
 * Developed against 4.3/4.4. No support for prior versions.
 * 
 */

#include <bdb.h>
#include <stdio.h>

#define LMEMFLAG 0
#define NOFLAGS 0
#undef DEBUG_DB

#ifdef HAVE_STDARG_PROTOTYPES
#include <stdarg.h>
#define va_init_list(a,b) va_start(a,b)
#else
#include <varargs.h>
#define va_init_list(a,b) va_start(a)
#endif

VALUE mBdb;  /* Top level module */
VALUE cDb;   /* DBT class */
VALUE cDbStat; /* db status class, not specialized for DBTYPE */
VALUE cEnv;  /* Environment class */
VALUE cTxn;  /* Transaction class */
VALUE cCursor; /* Cursors */
VALUE cTxnStat; /* Transaction Status class */
VALUE cTxnStatActive; /* Active Transaction Status class */
VALUE eDbError;

static ID fv_call,fv_uniq,fv_err_new,fv_err_code,fv_err_msg;

#define EXCEPTIONS_CREATE \
	eDbE_create(BUFFER_SMALL, BufferSmall)\
	eDbE_create(LOCK_DEADLOCK, LockDeadlock)\
	eDbE_create(LOCK_NOTGRANTED, LockNotgranted)\
	eDbE_create(REP_HANDLE_DEAD, RepHandleDead)\
	eDbE_create(REP_LEASE_EXPIRED, RepLeaseExpired)\
	eDbE_create(REP_LOCKOUT, RepLockout)\
	eDbE_create(SECONDARY_BAD, SecondaryBad) \
	eDbE_create(FOREIGN_CONFLICT, ForeignConflict) \
	eDbE_create(OLD_VERSION, OldVersion) \
	eDbE_create(KEYEXIST, KeyExist) \
	eDbE_create(KEYEMPTY, KeyEmpty) \
	eDbE_create(RUNRECOVERY, RunRecovery) \
	eDbE_create(VERSION_MISMATCH, VersionMismatch)

#define eDbE_create(n,c) VALUE eDbE_##c;
EXCEPTIONS_CREATE

/*
 * Document-class: Bdb::DbError
 *
 * Errors generated by methods under the Bdb hierarchy will be
 * of this class unless Ruby itself raises the error.
 *
 */

static void
#ifdef HAVE_STDARG_PROTOTYPES
raise_error(int code, const char *fmt, ...)
#else
  raise_error(code,fmt,va_alist)
     int code;
     const char *fmt;
     va_dcl
#endif
{
  va_list args;
  char buf[1024];
  VALUE exc;
  VALUE argv[2];

  va_init_list(args,fmt);
  vsnprintf(buf,1024,fmt,args);
  va_end(args);

  argv[0]=rb_str_new2(buf);
  argv[1]=INT2NUM(code);
	VALUE cl;
	switch( code) {
#define eDbE_create(n,c) case DB_##n: cl = eDbE_##c; break;
	EXCEPTIONS_CREATE
	default: cl = eDbError; break;
	}

  exc=rb_class_new_instance(2,argv,cl);
  rb_exc_raise(exc);
}

/*
 * An error can only be generated internally
 */
VALUE err_initialize(VALUE obj, VALUE message, VALUE code)
{
  VALUE args[1];
  args[0]=message;
  rb_call_super(1,args);
  return rb_ivar_set(obj,fv_err_code,code);
}

/*
 * call-seq:
 *   err.code() -> Bdb error code integer
 *
 */
VALUE err_code(VALUE obj)
{
  return rb_ivar_get(obj,fv_err_code);
}

static void db_free(t_dbh *dbh)
{
#ifdef DEBUG_DB
  if ( RTEST(ruby_debug) )
    fprintf(stderr,"%s/%d %s 0x%x\n",__FILE__,__LINE__,"db_free cleanup!",dbh);
#endif

  if (dbh) {
    if (dbh->db) {
			if (dbh->db_opened == 1)
      	dbh->db->close(dbh->db,NOFLAGS);
      if ( RTEST(ruby_debug) && dbh->filename[0] != '\0')
    fprintf(stderr,"%s/%d %s %p %s\n",__FILE__,__LINE__,
        "db_free database was still open!",(void*)dbh->db,dbh->filename);
      dbh->db=NULL;
    }
    free(dbh);
  }
}

static void db_mark(t_dbh *dbh)
{
  if ( dbh == NULL ) return;
  if ( ! NIL_P(dbh->aproc) ) 
    rb_gc_mark(dbh->aproc);
  if ( dbh->env )
    rb_gc_mark(dbh->env->self);
  if ( ! NIL_P(dbh->adbc) )
    rb_gc_mark(dbh->adbc);
}

static void dbc_mark(t_dbch *dbch)
{
  if (dbch->db)
    rb_gc_mark(dbch->db->self);
}
static void dbc_free(void *p)
{
  t_dbch *dbch;
  dbch=(t_dbch *)p;

#ifdef DEBUG_DB
  if ( RTEST(ruby_debug) )
    fprintf(stderr,"%s/%d %s 0x%x\n",__FILE__,__LINE__,
	       "dbc_free cleanup!",p);  
#endif

  if ( dbch ) {
    if ( dbch->dbc ) {
      dbch->dbc->c_close(dbch->dbc);
      if ( RTEST(ruby_debug) )
    fprintf(stderr,"%s/%d %s %p %s\n",__FILE__,__LINE__,
        "dbc_free cursor was still open!",p,dbch->filename);
    }
    free(p);
  }
}

VALUE
db_alloc(VALUE klass)
{
  return Data_Wrap_Struct(klass,db_mark,db_free,0);
}

VALUE db_init_aux(VALUE obj,t_envh * eh)
{
  DB *db;
  t_dbh *dbh;
  int rv;

  /* This excludes possible use of X/Open Transaction Mgr */
  rv = db_create(&db,(eh)?eh->env:NULL,NOFLAGS);
  if (rv != 0) {
    raise_error(rv, "db_new failure: %s",db_strerror(rv));
  }

#ifdef DEBUG_DB
  db->set_errfile(db,stderr);
#endif

  dbh=ALLOC(t_dbh);
  if (DATA_PTR(obj)) {
      /* if called from env_db, the data ptr has not been allocated,
       * was freeing 0x0 */
      db_free(DATA_PTR(obj));
  }
  DATA_PTR(obj)=dbh;
  dbh->db=db;
  dbh->self=obj;
  dbh->env=eh;
  dbh->aproc=Qnil;
  dbh->sproc=Qnil;
  memset(&(dbh->filename),0,FNLEN+1);

  dbh->adbc=Qnil;

  if (dbh->env) {
#ifdef DEBUG_DB
    fprintf(stderr,"Adding db to env 0x%x 0x%x\n",obj,dbh);
#endif
    rb_ary_push(dbh->env->adb,obj);
  }
    
  return obj;
}

/*
 * Document-class: Bdb::Db
 *
 */

VALUE db_initialize(VALUE obj)
{
  return db_init_aux(obj,NULL);
}

/*
 * call-seq:
 *	db.open(txn_object,disk_file,logical_db,db_type,flags,mode) -> value
 *
 * open a database. disk file is file path. logical_db is
 * a named database within that file, which will be created under
 * conditions noted by DB.
 *
 * db_type is one of the constants:
 * Bdb::DB::BTREE
 * Bdb::DB::HASH
 * Bdb::DB::RECNO
 * Bdb::DB::QUEUE
 * Bdb::DB::UNKNOWN
 *
 * unknown will open an already existing db in the mode created
 */
VALUE db_open(VALUE obj, VALUE vtxn, VALUE vdisk_file,
	      VALUE vlogical_db,
	      VALUE vdbtype, VALUE vflags, VALUE vmode)
{
  t_dbh *dbh;
  int rv;
  t_txnh *txn=NOTXN;
  u_int32_t flags=0;
  DBTYPE dbtype=DB_UNKNOWN;
  char *logical_db=NULL;
  //long len;
  int mode=0;

  if ( ! NIL_P(vflags) )
    flags=NUM2UINT(vflags);

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  if ( TYPE(vlogical_db)==T_STRING && RSTRING_LEN(vlogical_db) > 0 )
    logical_db=StringValueCStr(vlogical_db);
    
  if ( FIXNUM_P(vdbtype) ) {
    dbtype=NUM2INT(vdbtype);
    if ( dbtype < DB_BTREE || dbtype > DB_UNKNOWN ) {
      raise_error(0,"db_open Bad access type: %d",dbtype);
      return Qnil;
    }
  }

  if ( TYPE(vdisk_file)!=T_STRING || RSTRING_LEN(vdisk_file) < 1 ) {
    raise_error(0,"db_open Bad disk file name");
    return Qnil;
  }

  if ( ! NIL_P(vmode) )
    mode=NUM2INT(vmode);

  Data_Get_Struct(obj,t_dbh,dbh);
  if ( ! NIL_P(dbh->adbc) )
    raise_error(0,"db handle already opened");
  
  dbh->db->app_private=dbh;
  rv = dbh->db->open(dbh->db,txn?txn->txn:NULL,
                     StringValueCStr(vdisk_file),
                     logical_db,
                     dbtype,flags,mode);
  if (rv != 0) {
    raise_error(rv,"db_open failure: %s(%d)",db_strerror(rv),rv);
  }
  filename_copy(dbh->filename,vdisk_file)
  dbh->adbc=rb_ary_new();
	dbh->db_opened = 1;
  return obj;
}

/**
 * call-seq:
 *  db.set_re_len( db, re_len)
 *
 * Set record-length
 */
VALUE db_set_re_len(VALUE obj, VALUE re_len) {
	int rv;
	t_dbh *dbh;
	Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db isn't created");
	rv = dbh->db->set_re_len(dbh->db,NUM2UINT(re_len));
  if ( rv != 0 )
    raise_error(rv, "set_re_len failure: %s",db_strerror(rv));
	return re_len;
}

VALUE db_get_re_len( VALUE obj) {
	u_int32_t re_len;
	t_dbh *dbh;
	Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db isn't created");
	int rv = dbh->db->get_re_len(dbh->db,&re_len);
  if ( rv != 0 )
    raise_error(rv, "db_get_re_len failure: %s",db_strerror(rv));
	return UINT2NUM(re_len);
}

/*
 * call-seq:
 *	db.flags=value
 *
 * set database flags based on DB constants.
 * see http://www.sleepycat.com/docs/api_c/db_set_flags.html
 *
 */
VALUE db_flags_set(VALUE obj, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags;

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");

  rv = dbh->db->set_flags(dbh->db,flags);
  if ( rv != 0 ) {
    raise_error(rv, "db_flag_set failure: %s",db_strerror(rv));
  }
  return vflags;
}

/*
 * call-seq:
 *	db.flags -> value
 *
 * get database flags.
 * see http://www.sleepycat.com/docs/api_c/db_get_flags.html
 *
 */
VALUE db_flags_get(VALUE obj)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags;

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");

  rv = dbh->db->get_flags(dbh->db,&flags);
  if ( rv != 0 ) {
    raise_error(rv, "db_flag_get failure: %s",db_strerror(rv));
  }
  return INT2NUM(flags);
}

/*
 * call-seq:
 *	db.pagesize=value
 *
 * set database flags based on DB constants.
 * see http://www.sleepycat.com/docs/api_c/db_set_flags.html
 *
 */
VALUE db_pagesize_set(VALUE obj, VALUE vpagesize)
{
  t_dbh *dbh;
  int rv;
  u_int32_t pagesize;

  pagesize=NUM2INT(vpagesize);
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");
  rv = dbh->db->set_pagesize(dbh->db,pagesize);
  if ( rv != 0 ) {
    raise_error(rv, "db_pagesize_set failure: %s",db_strerror(rv));
  }
  return vpagesize;
}

/*
 * call-seq:
 *	db.pagesize
 *
 * set database flags based on DB constants.
 * see http://www.sleepycat.com/docs/api_c/db_set_flags.html
 *
 */
VALUE db_pagesize(VALUE obj)
{
  t_dbh *dbh;
  int rv;
  u_int32_t pagesize;

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");
  rv = dbh->db->get_pagesize(dbh->db,&pagesize);
  if ( rv != 0 ) {
    raise_error(rv, "db_pagesize_get failure: %s",db_strerror(rv));
  }
  return INT2NUM(pagesize);
}

/*
 * call-seq:
 *	db.h_ffactor=value
 *
 * get hash db fill factor
 * formula:
 *         (pagesize - 32) / (average_key_size + average_data_size + 8)
 *
 */
VALUE db_h_ffactor_set(VALUE obj, VALUE vint)
{
  t_dbh *dbh;
  int rv;
  u_int32_t cint;

  cint=NUM2INT(vint);
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");
  rv = dbh->db->set_h_ffactor(dbh->db,cint);
  if ( rv != 0 ) {
    raise_error(rv, "db_h_ffactor_set failure: %s",db_strerror(rv));
  }
  return vint;
}

/*
 * call-seq:
 *	db.h_ffactor
 *
 * get hash db fill factor
 * formula:
 *         (pagesize - 32) / (average_key_size + average_data_size + 8)
 * see http://www.sleepycat.com/docs/api_c/db_set_flags.html
 *
 */
VALUE db_h_ffactor(VALUE obj)
{
  t_dbh *dbh;
  int rv;
  u_int32_t cint;

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");
  rv = dbh->db->get_h_ffactor(dbh->db,&cint);
  if ( rv != 0 ) {
    raise_error(rv, "db_h_ffactor failure: %s",db_strerror(rv));
  }
  return INT2NUM(cint);
}
/*
 * call-seq:
 *	db.h_nelem=value
 *
 * set estimate number of elements in hash table
 * see http://www.sleepycat.com/docs/api_c/db_set_flags.html
 *
 */
VALUE db_h_nelem_set(VALUE obj, VALUE vint)
{
  t_dbh *dbh;
  int rv;
  u_int32_t cint;

  cint=NUM2INT(vint);
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");
  rv = dbh->db->set_h_nelem(dbh->db,cint);
  if ( rv != 0 ) {
    raise_error(rv, "db_h_nelem_set failure: %s",db_strerror(rv));
  }
  return vint;
}

/*
 * call-seq:
 *	db.h_nelem
 *
 * get estimate number of element in the hash table
 * see http://www.sleepycat.com/docs/api_c/db_set_flags.html
 *
 */
VALUE db_h_nelem(VALUE obj)
{
  t_dbh *dbh;
  int rv;
  u_int32_t nelem;

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");
  rv = dbh->db->get_h_nelem(dbh->db,&nelem);
  if ( rv != 0 ) {
    raise_error(rv, "db_h_nelem failure: %s",db_strerror(rv));
  }
  return INT2NUM(nelem);
}

VALUE dbc_close(VALUE);

/* call-seq:
 *	db.close(flags) -> value
 *
 * close a database handle. Will close open cursors.
 */
VALUE db_close(VALUE obj, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags;
  VALUE cur;

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbh,dbh);
  if ( dbh->db==NULL )
    return Qnil;

  if (! NIL_P(dbh->adbc) && RARRAY_LEN(dbh->adbc) > 0 ) {
    rb_warning("%s/%d %s",__FILE__,__LINE__,
	       "cursor handles still open");
    while ( (cur=rb_ary_pop(dbh->adbc)) != Qnil ) {
      dbc_close(cur);
    }
  }

  if ( RTEST(ruby_debug) )
    rb_warning("%s/%d %s 0x%p %s",__FILE__,__LINE__,"db_close!", (void*)dbh,
	       (dbh->filename==NULL||*(dbh->filename)=='0') ? (char*)"unknown" : dbh->filename);

  rv = dbh->db->close(dbh->db,flags);
  dbh->db=NULL;
  dbh->aproc=Qnil;
  dbh->sproc=Qnil;
  if ( dbh->env ) {
  	if ( RTEST(ruby_debug) )
    	rb_warning("%s/%d %s 0x%p",__FILE__,__LINE__,"db_close! removing",(void*)obj);
    rb_ary_delete(dbh->env->adb,obj);
    dbh->env = NULL;
  }
  if ( rv != 0 ) {
    raise_error(rv, "db_close failure: %s",db_strerror(rv));
  }
	dbh->db_opened = 0;
  return obj;
}

/*
 * call-seq:
 *	db.put(txn,key,data,flags) -> self
 *
 * put a key/data pair into the database. returns db. Will
 * raise an error on DB_KEYEXIST but error.code will indicate
 * so it can be easily caught.
 */
VALUE db_put(VALUE obj, VALUE vtxn, VALUE vkey, VALUE vdata, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags=0;
  DBT key,data;
  t_txnh *txn=NULL;

  memset(&key,0,sizeof(DBT));
  memset(&data,0,sizeof(DBT));

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  if ( ! NIL_P(vflags) )
    flags=NUM2UINT(vflags);

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");

	key.data = RSTRING_PTR(vkey);
	key.size = RSTRING_LEN(vkey);
	key.flags = LMEMFLAG;

  StringValue(vdata);
  data.data = RSTRING_PTR(vdata);
  data.size = RSTRING_LEN(vdata);
  data.flags = LMEMFLAG;

  rv = dbh->db->put(dbh->db,txn?txn->txn:NULL,&key,&data,flags);
  /*
  if (rv == DB_KEYEXIST)
    return Qnil;
  */
  if (rv != 0) {
    raise_error(rv, "db_put fails: %s",db_strerror(rv));
  }

	if ( flags & DB_APPEND == DB_APPEND ) {
    VALUE str = rb_str_new(key.data,key.size);
    if (key.data) free(key.data);
    return str;
	}

  return obj;
}

/*
 * call-seq:
 *	db.get(txn,key,data,flags) -> String(data)
 *
 * get a key/data pair from database. data as a string.
 * 
 */

VALUE db_get(VALUE obj, VALUE vtxn, VALUE vkey, VALUE vdata, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags=0;
  DBT key,data;
  VALUE str;
  t_txnh *txn=NULL;

  memset(&key,0,sizeof(DBT));
  memset(&data,0,sizeof(DBT));

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  if ( ! NIL_P(vflags) ) {
		flags=NUM2UINT(vflags);
	}
  
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise_error(0,"db is closed");

  StringValue(vkey);

  key.data  = RSTRING_PTR(vkey);
  key.size  = RSTRING_LEN(vkey);
  key.flags = LMEMFLAG;  

  if ( ! NIL_P(vdata) ) {
    StringValue(vdata);
    data.data = RSTRING_PTR(vdata);
    data.size = RSTRING_LEN(vdata);
  }
  data.flags = DB_DBT_MALLOC;

  rv = dbh->db->get(dbh->db,txn?txn->txn:NULL,&key,&data,flags);
  if ( rv == 0 ) {
    str = rb_str_new(data.data,data.size);
    if (data.data) free(data.data);
    return str;
  } else if (rv == DB_NOTFOUND) {
    return Qnil;
  } else {
    raise_error(rv, "db_get failure: %s",db_strerror(rv));
  }
  return Qnil;
}

/*
 * call-seq:
 *	db.pget(txn,key,data,flags) -> [pkey,data]
 *
 * get a key/data pair from database using a secondary index.
 * returns an array with a primary key and the data element.
 * 
 */
VALUE db_pget(VALUE obj, VALUE vtxn, VALUE vkey, VALUE vdata, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags=0;
  DBT key,data,pkey;
  //VALUE str;
  t_txnh *txn=NULL;

  memset(&key,0,sizeof(DBT));
  memset(&data,0,sizeof(DBT));
  memset(&pkey,0,sizeof(DBT));

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  if ( ! NIL_P(vflags) ) {
    flags=NUM2UINT(vflags);
  }
  
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  StringValue(vkey);

  key.data = RSTRING_PTR(vkey);
  key.size = RSTRING_LEN(vkey);
  key.flags = LMEMFLAG;

  if ( ! NIL_P(vdata) ) {
    StringValue(vdata);
    data.data = RSTRING_PTR(vdata);
    data.size = RSTRING_LEN(vdata);
    data.flags = LMEMFLAG;
  }

  rv = dbh->db->pget(dbh->db,txn?txn->txn:NULL,&key,&pkey,&data,flags);
  if ( rv == 0 ) {
    return 
      rb_ary_new3(2,
		  rb_str_new(pkey.data,pkey.size),
		  rb_str_new(data.data,data.size));

  } else if (rv == DB_NOTFOUND) {
    return Qnil;
  } else {
    raise_error(rv, "db_pget failure: %s",db_strerror(rv));
  }
  return Qnil;
}

/*
 * call-seq:
 *	db[key] -> data
 *
 * array ref style data retrieval
 *
 */
VALUE db_aget(VALUE obj, VALUE vkey)
{
  return db_get(obj,Qnil,vkey,Qnil,Qnil);
}

/*
 * call-seq:
 *	db[key]=data
 *
 * array ref style data storage
 */
VALUE db_aset(VALUE obj, VALUE vkey, VALUE vdata)
{
  return db_put(obj,Qnil,vkey,vdata,Qnil);
}

/*
 * call-seq:
 *	db.join([cursors],flags) -> join_cursor
 *
 * create a join cursor from an array of cursors.
 * The input cursors will usually be set_range and, if duplicate
 * data items are allowed the should be DUP_SORT or the performance
 * will be abysmal.
 */
VALUE db_join(VALUE obj, VALUE vacurs, VALUE vflags)
{
  t_dbh *dbh;
  t_dbch *dbch;
  u_int32_t flags;
  DBC **curs;
  int i,rv;
  VALUE jcurs;

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  curs = ALLOCA_N(DBC *,RARRAY_LEN(vacurs));
  for (i=0; i<RARRAY_LEN(vacurs); i++) {
    Data_Get_Struct(RARRAY_PTR(vacurs)[i],t_dbch,dbch);
    /* cursor is closed? */
    curs[i]=dbch->dbc;
  }
  curs[i]=NULL;
  jcurs=Data_Make_Struct(cCursor,t_dbch,dbc_mark,dbc_free,dbch);
  rv =  dbh->db->join(dbh->db,curs,&(dbch->dbc),flags);
  if (rv) {
    raise_error(rv, "db_join: %s",db_strerror(rv));
  }
  dbch->db=dbh;
  rb_ary_push(dbch->db->adbc,jcurs);
  rb_obj_call_init(jcurs,0,NULL);
  return jcurs;
}

#if DB_VERSION_MAJOR == 5 || DB_VERSION_MINOR > 3
/*
 * call-seq:
 *	db.compact(txn,start_key,stop_key,compact_opts,flags) -> end_key
 *
 * compact the database (4.4 an up). start and stop keys limit the
 * range of compaction in BTREE types. compact_opts is currently
 * ignored. Call returns the last key compacted (could be fed into
 * a subsequent call as the start_key).
 *
 */
VALUE db_compact(VALUE obj, VALUE vtxn, VALUE vstart_key,
		 VALUE vstop_key, VALUE db_compact,
		 VALUE vflags)
{
  t_dbh *dbh;
  u_int32_t flags;
  t_txnh *txn=NULL;
  DBT start_key, stop_key, end_key;
  int rv;

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  memset(&start_key,0,sizeof(DBT));
  memset(&stop_key,0,sizeof(DBT));
  memset(&end_key,0,sizeof(DBT));

  if ( ! NIL_P(vstart_key) ) {
    StringValue(vstart_key);
    start_key.data=RSTRING_PTR(vstart_key);
    start_key.size=RSTRING_LEN(vstart_key);
    start_key.flags= LMEMFLAG;
  }
  if ( ! NIL_P(vstop_key) ) {
    StringValue(vstop_key);
    stop_key.data=RSTRING_PTR(vstop_key);
    stop_key.size=RSTRING_LEN(vstop_key);
    stop_key.flags= LMEMFLAG;
  }
  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  rv=dbh->db->compact(dbh->db,txn?txn->txn:NULL,
		      &start_key,
		      &stop_key,
		      NULL,
		      flags,
		      &end_key);
  if (rv)
    raise_error(rv,"db_compact failure: %s",db_strerror(rv));

  return rb_str_new(end_key.data,end_key.size);
  
}
#endif

/*
 * call-seq:
 * db.get_byteswapped -> true/false
 *
 * true if database is running in swapped mode. This happens when
 * the db is created on a machine with a different endian.
 */
VALUE db_get_byteswapped(VALUE obj)
{
  t_dbh *dbh;
  int rv;
  int is_swapped;

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");
  rv=dbh->db->get_byteswapped(dbh->db,&is_swapped);
  if (rv)
    raise_error(rv,"db_get_byteswapped failed: %s",db_strerror(rv));
  if (is_swapped)
    return Qtrue;
  else
    return Qfalse;
}

/*
 * call-seq:
 * db.get_type -> Fixnum(type)
 *
 * an integer indicating the type of db (BTREE, HASH, etc).
 */
VALUE db_get_type(VALUE obj)
{
  t_dbh *dbh;
  int rv;
  DBTYPE dbtype;

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");
  rv=dbh->db->get_type(dbh->db,&dbtype);
  if (rv)
    raise_error(rv,"db_get_type failed: %s",db_strerror(rv));
  return INT2FIX(dbtype);
}

/*
 * call-seq:
 * db.remove(disk_file,logical_db,flags) -> true
 *
 * removes a whole database file, or just a logical_db within
 * that file, i.e. if file and logical are both specified, only
 * the logical will be removed. the Bdb::Db instance cannot have
 * been previously used for anything and cannot be used after.
 */
VALUE db_remove(VALUE obj, VALUE vdisk_file,
		VALUE vlogical_db, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags=0;
  //char *logical_db=NULL;

  if ( ! NIL_P(vflags) )
    flags=NUM2UINT(vflags);

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");
  rv=dbh->db->remove(dbh->db,
		     NIL_P(vdisk_file)?NULL:StringValueCStr(vdisk_file),
		     NIL_P(vlogical_db)?NULL:StringValueCStr(vlogical_db),
		     flags);
  /* handle cannot be accessed again per docs */
  dbh->db=NULL;
  if (rv)
    raise_error(rv,"db_remove failed: %s",db_strerror(rv));
  return Qtrue;
}

/*
 * call-seq:
 * db.rename(file,logical,newname,flags) -> true
 *
 * rename a file or logical db to newname.
 */
VALUE db_rename(VALUE obj, VALUE vdisk_file,
		VALUE vlogical_db, VALUE newname, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags=0;
  //char *disk_file=NULL;
  //char *logical_db=NULL;

  if ( ! NIL_P(vflags) )
    flags=NUM2UINT(vflags);

  if ( NIL_P(newname) )
    raise_error(0,"db_rename newname must be specified");

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");
  rv=dbh->db->rename(dbh->db,
		     NIL_P(vdisk_file)?NULL:StringValueCStr(vdisk_file),
		     NIL_P(vlogical_db)?NULL:StringValueCStr(vlogical_db),
		     StringValueCStr(newname),
		     flags);

  if (rv) {
    raise_error(rv,"db_rename failed: %s",db_strerror(rv));
	}
  return Qtrue;
}

/*
 * call-seq:
 * db.sync -> true
 *
 * sync the database out to storage.
 */
VALUE db_sync(VALUE obj)
{
  t_dbh *dbh;
  int rv;

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");
  rv=dbh->db->sync(dbh->db,NOFLAGS);

  if (rv)
    raise_error(rv,"db_sync failed: %s",db_strerror(rv));
  return Qtrue;
}

/*
 * call-seq:
 * db.truncate(txn) -> Fixnum(record_count)
 *
 * truncate, i.e. remove all records, purge, trash.
 *
 */
VALUE db_truncate(VALUE obj, VALUE vtxn)
{
  t_dbh *dbh;
  t_txnh *txn=NULL;
  int rv;
  //VALUE result;
  u_int32_t count;

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  rv=dbh->db->truncate(dbh->db,txn?txn->txn:NULL,&count,NOFLAGS);
  if (rv)
    raise_error(rv,"db_truncate: %s",db_strerror(rv));

  return INT2FIX(count);
}

/**
 * call-seq:
 *  db.set_encrypt(db, passwd)
 *
 * Set encrypt
 */
VALUE db_set_encrypt(VALUE obj, VALUE vpasswd) {
    int rv;
    t_dbh *dbh;
    const char *passwd;
 
    passwd = StringValueCStr(vpasswd);
    Data_Get_Struct(obj,t_dbh,dbh);
    u_int32_t flags=0x00000001; //DB_ENCRYPT_AES
       
    rv = dbh->db->set_encrypt(dbh->db, passwd, flags);
    
    if ( rv != 0 )
        raise_error(rv, "set_encrypt failure: %s",db_strerror(rv));
       return vpasswd;
}

/*
 * call-seq:
 * db.key_range(txn,vkey,flags) -> [#less,#same,#greater]
 *
 * calculate position of key within database. returns the counts
 * of keys less, same and greater than the given key as an
 * array of Fixnum-s
 */
VALUE db_key_range(VALUE obj, VALUE vtxn, VALUE vkey, VALUE vflags)
{
  t_dbh *dbh;
  t_txnh *txn=NULL;
  DBT key;
  u_int32_t flags = 0;
  int rv;
  DB_KEY_RANGE key_range;
  VALUE result;

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  if ( ! NIL_P(vflags) )
    flags=NUM2UINT(vflags);

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  memset(&key,0,sizeof(DBT));
  StringValue(vkey);
  key.data = RSTRING_PTR(vkey);
  key.size = RSTRING_LEN(vkey);
  key.flags = LMEMFLAG;

  rv=dbh->db->key_range(dbh->db,txn?txn->txn:NULL,&key,
			&key_range,flags);
  if (rv)
    raise_error(rv,"db_key_range: %s",db_strerror(rv));

  result=rb_ary_new3(3,
		     rb_float_new(key_range.less),
		     rb_float_new(key_range.equal),
		     rb_float_new(key_range.greater));
  return result;
}

/*
 * call-seq:
 * db.del(txn,key,flags) -> true/nil
 *
 * delete records with the key given. DUP values will removed
 * as a group. true if deleted extant records. NIL if the
 * operation resulted in DB indicating DB_NOTFOUND.
 *
 */
VALUE db_del(VALUE obj, VALUE vtxn, VALUE vkey, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags;
  DBT key;
  //VALUE str;
  t_txnh *txn=NULL;

  memset(&key,0,sizeof(DBT));

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  StringValue(vkey);
  key.data = RSTRING_PTR(vkey);
  key.size = RSTRING_LEN(vkey);
  key.flags = LMEMFLAG;

  rv = dbh->db->del(dbh->db,txn?txn->txn:NULL,&key,flags);
  if ( rv == DB_NOTFOUND ) {
    return Qnil;
  } else if (rv != 0) {
    raise_error(rv, "db_del failure: %s",db_strerror(rv));
  }
  return Qtrue;
}

void assoc_key(DBT* result, VALUE obj) {
  VALUE key = StringValue(obj);
  int   len = RSTRING_LEN(key);
  char *str = malloc(len);
  memcpy(str, RSTRING_PTR(key), len);

#ifdef DEBUG_DB
  fprintf(stderr,"assoc_key %*s", len, str);
#endif

  result->size  = len;
  result->flags = LMEMFLAG | DB_DBT_APPMALLOC;
  result->data  = str;
}

VALUE assoc_call(VALUE *args)
{
  return rb_funcall(args[0],fv_call,3,args[1],args[2],args[3]);
}

VALUE assoc_rescue(VALUE *error, VALUE e)
{
  VALUE message = StringValue(e);
  rb_warn(RSTRING_PTR(message));
  *error = e;
	return Qnil;
}

int assoc_callback(DB *secdb, const DBT* pkey, const DBT* data, DBT* skey)
{
  t_dbh *dbh;
  //VALUE proc;
  VALUE error = Qnil;
  VALUE retv;
  VALUE args[4];
  VALUE keys;
  int i;

  memset(skey,0,sizeof(DBT));
  dbh=secdb->app_private;

  args[0]=dbh->aproc;
  args[1]=dbh->self;
  args[2]=rb_str_new(pkey->data,pkey->size);
  args[3]=rb_str_new(data->data,data->size);

#ifdef DEBUG_DB
  fprintf(stderr,"assoc_data %*s", data->size, data->data);
#endif

  retv=rb_rescue((VALUE(*)_((VALUE)))assoc_call,(VALUE)args,(VALUE(*)_((VALUE)))assoc_rescue,(VALUE)&error);
  
  if (!NIL_P(error)) return 99999;
  if (NIL_P(retv))
    return DB_DONOTINDEX;

  keys = rb_check_array_type(retv);
  if (!NIL_P(keys)) {
    keys = rb_funcall(keys,fv_uniq,0); /* secondary keys must be uniq */
    switch(RARRAY_LEN(keys)) {
    case 0:
      return DB_DONOTINDEX;
    case 1:
      retv=RARRAY_PTR(keys)[0];
      break;
    default:
      skey->size  = RARRAY_LEN(keys);
      skey->flags = LMEMFLAG | DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
      skey->data  = malloc(skey->size * sizeof(DBT));
      memset(skey->data, 0, skey->size * sizeof(DBT));
    
      for (i=0; i<skey->size; i++) {
        assoc_key(skey->data + i * sizeof(DBT), (VALUE)RARRAY_PTR(keys)[i]);
      }
      return 0;
    }
  }

  assoc_key(skey, retv);
  return 0;
}

/*
 * call-seq:
 * db.associate(txn,sec_db,flags,proc)
 *
 * associate a secondary index(database) with this (primary)
 * database. The proc can be nil if the database is only opened
 * DB_RDONLY.
 *
 * call back proc has signature:
 * proc(secdb,key,value)
 */
VALUE db_associate(VALUE obj, VALUE vtxn, VALUE osecdb,
		   VALUE vflags, VALUE cb_proc)
{
  t_dbh *sdbh,*pdbh;
  int rv;
  u_int32_t flags,flagsp,flagss;
  //int fdp;
  t_txnh *txn=NOTXN;

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbh,pdbh);
  if (!pdbh->db)
    raise(0, "db is closed");
  Data_Get_Struct(osecdb,t_dbh,sdbh);
  if (!sdbh->db)
    raise(0, "sdb is closed");

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  if ( cb_proc == Qnil ) {
    rb_warning("db_associate: no association may be applied");
    pdbh->db->get_open_flags(pdbh->db,&flagsp);
    sdbh->db->get_open_flags(sdbh->db,&flagss);
    if ( flagsp & DB_RDONLY & flagss ) {
      rv=pdbh->db->associate(pdbh->db,txn?txn->txn:NULL,
			     sdbh->db,NULL,flags);
      if (rv)
	raise_error(rv,"db_associate: %s",db_strerror(rv));
      return Qtrue;
    } else {
      raise_error(0,"db_associate empty associate only available when both DBs opened with DB_RDONLY");
    }
  } else if ( rb_obj_is_instance_of(cb_proc,rb_cProc) != Qtrue ) {
    raise_error(0, "db_associate proc required");
  }
  
  sdbh->aproc=cb_proc;

  rv=pdbh->db->associate(pdbh->db,txn?txn->txn:NULL,sdbh->db,assoc_callback,flags);
#ifdef DEBUG_DB
  fprintf(stderr,"file is %d\n",fdp);
  fprintf(stderr,"assoc done 0x%x\n",sdbh);
#endif
  if (rv != 0) {
    raise_error(rv, "db_associate failure: %s",db_strerror(rv));
  }
  return Qtrue;
}

VALUE
bt_compare_callback2(VALUE *args)
{
  return rb_funcall(args[0],fv_call,3,args[1],args[2],args[3]);
}

int bt_compare_callback(DB *db, const DBT* key1, const DBT* key2)
{
  t_dbh *dbh;
  //VALUE proc;
  int cmp;
  VALUE retv;

  dbh=db->app_private;

  /* Shouldn't catch exceptions in the callback, because bad sort data will corrupt the BTree.*/
  retv=rb_funcall(dbh->sproc,fv_call,3,dbh->self,
                  rb_str_new(key1->data,key1->size),rb_str_new(key2->data,key2->size));

  if (!FIXNUM_P(retv))
    rb_raise(rb_eTypeError,"btree comparison should return Fixnum");

  cmp=FIX2INT(retv);

#ifdef DEBUG_DB
  fprintf(stderr,"bt_compare %*s <=> %*s: %d", key1->size, key1->data, key2->size, key2->data, cmp);
#endif

  return cmp;
}

/*
 * call-seq:
 * db.btree_compare = proc
 *
 * set the btree key comparison function to the callback proc.
 *
 * callback proc has signature:
 * proc(db,key1,key2)
 */
VALUE db_btree_compare_set(VALUE obj, VALUE cb_proc)
{
  t_dbh *dbh;
  int rv;

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  if ( rb_obj_is_instance_of(cb_proc,rb_cProc) != Qtrue ) {
    raise_error(0, "db_associate proc required");
  }
  
  dbh->sproc=cb_proc;
  rv=dbh->db->set_bt_compare(dbh->db,bt_compare_callback);

#ifdef DEBUG_DB
  fprintf(stderr,"btree_compare set 0x%x\n",dbh);
#endif
  if (rv != 0) {
    raise_error(rv, "db_btree_compare_set failure: %s",db_strerror(rv));
  }
  return Qtrue;
}

/*
 * call-seq:
 * db.cursor(txn,flags)
 *
 * open a cursor
 */
VALUE db_cursor(VALUE obj, VALUE vtxn, VALUE vflags)
{
  t_dbh *dbh;
  int rv;
  u_int32_t flags;
  //DBC *dbc;
  t_txnh *txn=NOTXN;
  VALUE c_obj;
  t_dbch *dbch;

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  c_obj=Data_Make_Struct(cCursor, t_dbch, dbc_mark, dbc_free, dbch);

  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  rv=dbh->db->cursor(dbh->db,txn?txn->txn:NULL,&(dbch->dbc),flags);
  if (rv)
    raise_error(rv,"db_cursor: %s",db_strerror(rv));

  filename_dup(dbch->filename,dbh->filename);
  dbch->db=dbh;
  rb_ary_push(dbch->db->adbc,c_obj);
  rb_obj_call_init(c_obj,0,NULL);
  return c_obj;
}

/*
 * call-seq:
 * dbc.close -> nil
 *
 * close an open cursor
 */
VALUE dbc_close(VALUE obj)
{
  t_dbch *dbch;
  int rv;
  Data_Get_Struct(obj,t_dbch,dbch);
  if ( dbch->dbc ) {
    rv=dbch->dbc->c_close(dbch->dbc);
    rb_ary_delete(dbch->db->adbc,obj);
    dbch->db=NULL;
    dbch->dbc=NULL;
    if (rv)
      raise_error(rv,"dbc_close: %s",db_strerror(rv));
  }
  return Qnil;
}

/*
 * call-seq:
 * dbc.get(key,data,flags) -> [key,data]
 *
 * get data by key or key and data. returns array of key,data
 */
VALUE dbc_get(VALUE obj, VALUE vkey, VALUE vdata, VALUE vflags)
{
  t_dbch *dbch;
  u_int32_t flags;
  DBT key,data;
  VALUE rar;
  int rv;

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbch,dbch);
  if (!dbch->dbc)
    raise(0, "dbc is closed");

  memset(&key,0,sizeof(DBT));
  memset(&data,0,sizeof(DBT));

  if ( ! NIL_P(vkey) ) {
    StringValue(vkey);
    key.data = RSTRING_PTR(vkey);
    key.size = RSTRING_LEN(vkey);
    key.flags = LMEMFLAG;
  }
  if ( ! NIL_P(vdata) ) {
    StringValue(vdata);
    data.data = RSTRING_PTR(vdata);
    data.size = RSTRING_LEN(vdata);
    data.flags = LMEMFLAG;
  }

  rv = dbch->dbc->c_get(dbch->dbc,&key,&data,flags);
  if ( rv == 0 ) {
    rar = rb_ary_new3(2,rb_str_new(key.data,key.size),
		     rb_str_new(data.data,data.size));
    return rar;
  } else if (rv == DB_NOTFOUND) {
    return Qnil;
  } else {
    raise_error(rv, "dbc_get %s",db_strerror(rv));
  }
  return Qnil;
}
/*
 * call-seq:
 * dbc.pget(key,data,flags) -> [key,pkey,data]
 *
 * cursor pget, returns array(key, primary key, data)
 */
VALUE dbc_pget(VALUE obj, VALUE vkey, VALUE vdata, VALUE vflags)
{
  t_dbch *dbch;
  u_int32_t flags;
  DBT key,data,pkey;
  VALUE rar;
  int rv;

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbch,dbch);
  if (!dbch->dbc)
    raise(0, "dbc is closed");

  memset(&key,0,sizeof(DBT));
  memset(&data,0,sizeof(DBT));
  memset(&pkey,0,sizeof(DBT));

  if ( ! NIL_P(vkey) ) {
    StringValue(vkey);
    key.data = RSTRING_PTR(vkey);
    key.size = RSTRING_LEN(vkey);
    key.flags = LMEMFLAG;
  }
  if ( ! NIL_P(vdata) ) {
    StringValue(vdata);
    data.data = RSTRING_PTR(vdata);
    data.size = RSTRING_LEN(vdata);
    data.flags = LMEMFLAG;
  }

  rv = dbch->dbc->c_pget(dbch->dbc,&key,&pkey,&data,flags);
  if ( rv == 0 ) {
    rar = rb_ary_new3(3,
		      rb_str_new(key.data,key.size),
		      rb_str_new(pkey.data,pkey.size),
		      rb_str_new(data.data,data.size));
    return rar;
  } else if (rv == DB_NOTFOUND) {
    return Qnil;
  } else {
    raise_error(rv, "dbc_pget %s",db_strerror(rv));
  }
  return Qnil;
}

/*
 * call-seq:
 * dbc.put(key,data,flags) -> data
 *
 * cursor put key/data
 */
VALUE dbc_put(VALUE obj, VALUE vkey, VALUE vdata, VALUE vflags)
{
  t_dbch *dbch;
  u_int32_t flags;
  DBT key,data;
  int rv;

  if ( NIL_P(vdata) )
    raise_error(0,"data element is required for put");

  flags=NUM2UINT(vflags);
  Data_Get_Struct(obj,t_dbch,dbch);
  if (!dbch->dbc)
    raise(0, "dbc is closed");

  memset(&key,0,sizeof(DBT));
  memset(&data,0,sizeof(DBT));

  if ( ! NIL_P(vkey) ) {
    StringValue(vkey);
    key.data = RSTRING_PTR(vkey);
    key.size = RSTRING_LEN(vkey);
    key.flags = LMEMFLAG;
  }

  StringValue(vdata);
  data.data = RSTRING_PTR(vdata);
  data.size = RSTRING_LEN(vdata);
  data.flags = LMEMFLAG;

  rv = dbch->dbc->c_put(dbch->dbc,&key,&data,flags);
  if (rv != 0)
    raise_error(rv,"dbc_put failure: %s",db_strerror(rv));

  return vdata;
}

/*
 * call-seq:
 * dbc.del -> true|nil
 *
 * delete tuple at current cursor position. returns true if
 * something was deleted, nil otherwise (DB_KEYEMPTY)
 *
 */
VALUE dbc_del(VALUE obj)
{
  t_dbch *dbch;
  int rv;

  Data_Get_Struct(obj,t_dbch,dbch);
  if (!dbch->dbc)
    raise(0, "dbc is closed");
  rv = dbch->dbc->c_del(dbch->dbc,NOFLAGS);
  if (rv == DB_KEYEMPTY)
    return Qnil;
  else if (rv != 0) {
    raise_error(rv, "dbc_del failure: %s",db_strerror(rv));
  }
  return Qtrue;
}

/*
 * call-seq:
 * dbc.count -> Fixnum(count)
 *
 * returns cursor count as per DB.
 */
VALUE dbc_count(VALUE obj)
{
  t_dbch *dbch;
  int rv;
  db_recno_t count;

  Data_Get_Struct(obj,t_dbch,dbch);
  if (!dbch->dbc)
    raise(0, "dbc is closed");
  rv = dbch->dbc->c_count(dbch->dbc,&count,NOFLAGS);
  if (rv != 0)
    raise_error(rv, "db_count failure: %s",db_strerror(rv));

  return INT2FIX(count);
}

static void env_free(void *p)
{
  t_envh *eh;
  eh=(t_envh *)p;

#ifdef DEBUG_DB
  if ( RTEST(ruby_debug) )
    fprintf(stderr,"%s/%d %s 0x%x\n",__FILE__,__LINE__,"env_free cleanup!",p);
#endif

  if ( eh ) {
    if ( eh->env ) {
      eh->env->close(eh->env,NOFLAGS);
      eh->env=NULL;
    }
    free(p);
  }
}
static void env_mark(t_envh *eh)
{
  rb_gc_mark(eh->adb);
  rb_gc_mark(eh->atxn);
}

/*
 * Document-class: Bdb::Env
 *
 * Encapsulated DB environment. Create by simple new (with
 * flags as neede), then open. Database handles created with
 * env.db -> Bdb::Db object.
 */

/*
 * call-seq:
 * new(flags) -> object
 *
 *
 */
VALUE env_new(VALUE class, VALUE vflags)
{
  t_envh *eh;
  int rv;
  u_int32_t flags=0;
  VALUE obj;

  if ( ! NIL_P(vflags) )
    flags=NUM2UINT(vflags);

  obj=Data_Make_Struct(class,t_envh,env_mark,env_free,eh);
  rv=db_env_create(&(eh->env),flags);
  if ( rv != 0 ) {
    raise_error(rv,"env_new: %s",db_strerror(rv));
    return Qnil;
  }
  eh->self=obj;
  eh->adb = rb_ary_new();
  eh->atxn = rb_ary_new();
  rb_obj_call_init(obj,0,NULL);
  return obj;
}

/*
 * call-seq:
 * env.open(homedir,flags,mode) -> self
 *
 * open an environment
 */
VALUE env_open(VALUE obj, VALUE vhome, VALUE vflags, VALUE vmode)
{
  t_envh *eh;
  int rv;
  u_int32_t flags=0;
  int mode=0;

  if ( ! NIL_P(vflags) )
    flags=NUM2UINT(vflags);
  if ( ! NIL_P(vmode) )
    mode=NUM2INT(vmode);
  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  if ( NIL_P(eh->adb) )
    raise_error(0,"env handle already used and closed");

  rv = eh->env->open(eh->env,StringValueCStr(vhome),flags,mode);
  eh->env->app_private=eh;

  if (rv != 0) {
    raise_error(rv, "env_open failure: %s",db_strerror(rv));
  }
  return obj;
}

VALUE txn_abort(VALUE);

/*
 * call-seq:
 * env.close -> self
 *
 * close an environment. Do not use it after you close it. 
 * The close process will automatically abort any open transactions
 * and close open databases (which also closes open cursors).
 * But this is just an effort to keep your dbs and envs from
 * becoming corrupted due to ruby errors that exit
 * unintentionally. However, to make this at all worth anything
 * use ObjectSpace.define_finalizer which calls env.close to
 * approach some assurance of it happening.
 *
 */
VALUE env_close(VALUE obj)
{
  t_envh *eh;
  VALUE db;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if ( eh->env==NULL )
    return Qnil;

  if (RARRAY_LEN(eh->adb) > 0) {
    rb_warning("%s/%d %s %li",__FILE__,__LINE__,
	       "database handles still open",RARRAY_LEN(eh->adb));
    while (RARRAY_LEN(eh->adb) > 0)
      if ((db=rb_ary_pop(eh->adb)) != Qnil ) {
    rb_warning("%s/%d %s 0x%p",__FILE__,__LINE__,
           "closing",(void*)db);
      /* this could raise! needs rb_protect */
    db_close(db,INT2FIX(0));
      }
  }
  if (RARRAY_LEN(eh->atxn) > 0) {
    rb_warning("%s/%d %s",__FILE__,__LINE__,
	       "database transactions still open");
    while ( (db=rb_ary_pop(eh->atxn)) != Qnil ) {
      /* this could raise! needs rb_protect */
      txn_abort(db);
    }
  }

  if ( RTEST(ruby_debug) )
    rb_warning("%s/%d %s 0x%p",__FILE__,__LINE__,"env_close!",(void*)eh);

  rv = eh->env->close(eh->env,NOFLAGS);
  eh->env=NULL;
  eh->adb=Qnil;
  eh->atxn=Qnil;
  if ( rv != 0 ) {
    raise_error(rv, "env_close failure: %s",db_strerror(rv));
    return Qnil;
  }
  return obj;
}

/*
 * call-seq:
 * env.db -> Bdb::Db instance
 *
 * create a db associated with an environment
 */
VALUE env_db(VALUE obj)
{
  t_envh *eh;
  VALUE dbo;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  dbo = Data_Wrap_Struct(cDb,db_mark,db_free,0);

  return db_init_aux(dbo,eh);
}

/*
 * call-seq:
 * env.cachesize=Fixnum|Bignum
 *
 * set the environment cache size. If it is a Bignum then it
 * will populate the bytes and gbytes part of the DB struct.
 * Fixnums will only populate bytes (which is still pretty big).
 */
VALUE env_set_cachesize(VALUE obj, VALUE size)
{
  t_envh *eh;
  unsigned long long ln;
  u_int32_t bytes=0,gbytes=0;
  int rv;
  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");

  if ( TYPE(size) == T_BIGNUM ) {
    ln = rb_big2ull(size);
		gbytes = ln / (1024*1024*1024);
		bytes = ln - (gbytes*1024*1024*1024);
  } else if (FIXNUM_P(size) ) {
    bytes=NUM2UINT(size);
  } else {
    raise_error(0,"set_cachesize requires number");
    return Qnil;
  }

  rv=eh->env->set_cachesize(eh->env,gbytes,bytes,1);
  if ( rv != 0 ) {
    raise_error(rv, "set_cachesize failure: %s",db_strerror(rv));
    return Qnil;
  }

  return Qtrue;
}

/*
 * call-seq:
 * env.cachesize -> Fixnum|Bignum
 *
 * return the environment cache size. If it is a Bignum then it
 * will populate the bytes and gbytes part of the DB struct.
 * Fixnums will only populate bytes (which is still pretty big).
 */
VALUE env_get_cachesize(VALUE obj)
{
  t_envh *eh;
  //unsigned long long ln;
  u_int32_t bytes=0,gbytes=0;
	int ncache;
  int rv;
  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");

  rv=eh->env->get_cachesize(eh->env,&gbytes,&bytes,&ncache);
  if ( rv != 0 ) {
    raise_error(rv, "get_cachesize failure: %s",db_strerror(rv));
    return Qnil;
  }

	if (gbytes != 0)
		return ULL2NUM(gbytes*1024*1024*1024+bytes);
	else
		return UINT2NUM(bytes);

  return Qtrue;
}

VALUE env_set_flags(VALUE obj, VALUE vflags, int onoff)
{
  t_envh *eh;
  int rv;
  u_int32_t flags;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  if ( ! NIL_P(vflags) ) {
    flags=NUM2UINT(vflags);

    rv=eh->env->set_flags(eh->env,flags,onoff);

    if ( rv != 0 ) {
      raise_error(rv, "set_flags failure: %s",db_strerror(rv));
      return Qnil;
    }
  }
  return Qtrue;
}

/*
 * call-seq:
 * env.flags -> flags
 *
 * get what flags are on.
 */
VALUE env_get_flags(VALUE obj)
{
  t_envh *eh;
  int rv;
  u_int32_t flags;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");

   rv=eh->env->get_flags(eh->env,&flags);

   if ( rv != 0 ) {
     raise_error(rv, "set_flags failure: %s",db_strerror(rv));
     return Qnil;
   }

  return UINT2NUM(flags);
}

/*
 * call-seq:
 * env.flags_on=flags
 *
 * set the 'flags' on. An or'ed set of flags will be set on.
 * Only included flags will be affected. Serialized calls
 * will only affect flags indicated (leaving others, default or
 * set as they were).
 */
VALUE env_set_flags_on(VALUE obj, VALUE vflags)
{
  return env_set_flags(obj,vflags,1);
}

/*
 * call-seq:
 * env.flags_off=flags
 *
 * set the 'flags' off. An or'ed set of flags will be set off.
 * Only included flags will be affected. Serialized calls
 * will only affect flags indicated (leaving others, default or
 * set as they were).
 */
VALUE env_set_flags_off(VALUE obj, VALUE vflags)
{
  return env_set_flags(obj,vflags,0);
}

/*
 * call-seq:
 * env.list_dbs -> [Bdb::Db array]
 *
 * return 0 or more open databases within the receiver environment.
 * If 0, will return [], not nil.
 */
VALUE env_list_dbs(VALUE obj)
{
  t_envh *eh;
  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  return eh->adb;
}
static void txn_mark(t_txnh *txn)
{
  if (txn->env)
    rb_gc_mark(txn->env->self);
}
static void txn_free(t_txnh *txn)
{
#ifdef DEBUG_DB
  if ( RTEST(ruby_debug) )
    fprintf(stderr,"%s/%d %s %p\n",__FILE__,__LINE__,"txn_free",txn);
#endif

  if (txn) {
    if (txn->txn)
      txn->txn->abort(txn->txn);
    txn->txn=NULL;
    if (txn->env) {
        rb_ary_delete(txn->env->atxn,txn->self);
    }
    txn->env=NULL;

    free(txn);
  }
}

/*
 * call-seq:
 * env.txn_begin(txn_parent,flags) -> Bdb::Txn
 *
 * start a root transaction or embedded (via txn_parent).
 */
VALUE env_txn_begin(VALUE obj, VALUE vtxn_parent, VALUE vflags)
{
  t_txnh *parent=NULL, *txn=NULL;
  u_int32_t flags=0;
  int rv;
  t_envh *eh;
  VALUE t_obj;

  if ( ! NIL_P(vflags))
    flags=NUM2UINT(vflags);
  if ( ! NIL_P(vtxn_parent) ) {
    Data_Get_Struct(vtxn_parent,t_txnh,parent);
    if (!parent->txn)
        raise(0, "parent txn is closed");
  }

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  t_obj=Data_Make_Struct(cTxn,t_txnh,txn_mark,txn_free,txn);

  rv=eh->env->txn_begin(eh->env,parent?parent->txn:NULL,
			&(txn->txn),flags);

  if ( rv != 0 ) {
    raise_error(rv, "env_txn_begin: %s",db_strerror(rv));
    return Qnil;
  }
  txn->env=eh;
  txn->self=t_obj;
  rb_ary_push(eh->atxn,t_obj);

  /* Once we get this working, we'll have to track transactions */
  rb_obj_call_init(t_obj,0,NULL);
  return t_obj;
}

/*
 * call-seq:
 * env.txn_checkpoint -> true
 *
 * Cause env transaction state to be checkpointed.
 */
VALUE env_txn_checkpoint(VALUE obj, VALUE vkbyte, VALUE vmin,
			 VALUE vflags)
{
  u_int32_t flags=0;
  int rv;
  t_envh *eh;
  u_int32_t kbyte=0, min=0;

  if ( ! NIL_P(vflags))
    flags=NUM2UINT(vflags);

  if ( FIXNUM_P(vkbyte) )
    kbyte=FIX2UINT(vkbyte);

  if ( FIXNUM_P(vmin) )
    min=FIX2UINT(vmin);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->txn_checkpoint(eh->env,kbyte,min,flags);
  if ( rv != 0 ) {
    raise_error(rv, "env_txn_checkpoint: %s",db_strerror(rv));
    return Qnil;
  }
  return Qtrue;
}

VALUE env_txn_stat_active(DB_TXN_ACTIVE *t)
{
  VALUE ao;

  ao=rb_class_new_instance(0,NULL,cTxnStatActive);

  rb_iv_set(ao,"@txnid",INT2FIX(t->txnid));
  rb_iv_set(ao,"@parentid",INT2FIX(t->parentid));
  /*  rb_iv_set(ao,"@thread_id",INT2FIX(t->thread_id)); */
  rb_iv_set(ao,"@lsn",rb_ary_new3(2,
				  INT2FIX(t->lsn.file),
				  INT2FIX(t->lsn.offset)));
  /* XA status is currently excluded */
  return ao;
}

/*
 * call-seq:
 * db.stat(txn,flags) -> Bdb::Db::Stat
 *
 * get database status. Returns a Bdb::Db::Stat object that
 * is specialized to the db_type and only responds to [] to
 * retrieve status values. All values are stored in instance
 * variables, so singleton classes can be created and instance_eval
 * will work.
 */
VALUE db_stat(VALUE obj, VALUE vtxn, VALUE vflags)
{
  u_int32_t flags=0;
  int rv;
  t_dbh *dbh;
  t_txnh *txn=NULL;
  DBTYPE dbtype;
  union {
    void *stat;
    DB_HASH_STAT *hstat;
    DB_BTREE_STAT *bstat;
    DB_QUEUE_STAT *qstat;
  } su;
  VALUE s_obj;

  if ( ! NIL_P(vflags) )
    flags=NUM2UINT(vflags);
  if ( ! NIL_P(vtxn) ) {
    Data_Get_Struct(vtxn,t_txnh,txn);
    if (!txn->txn)
        raise(0, "txn is closed");
  }

  Data_Get_Struct(obj,t_dbh,dbh);
  if (!dbh->db)
    raise(0, "db is closed");

  rv=dbh->db->get_type(dbh->db,&dbtype);
  if (rv)
    raise_error(rv,"db_stat %s",db_strerror(rv));
#if DB_VERSION_MAJOR == 5 || DB_VERSION_MINOR > 2
  rv=dbh->db->stat(dbh->db,txn?txn->txn:NULL,&(su.stat),flags);
#else
  rv=dbh->db->stat(dbh->db,&(su.stat),flags);
#endif
  if (rv)
    raise_error(rv,"db_stat %s",db_strerror(rv));

  s_obj=rb_class_new_instance(0,NULL,cDbStat);
  rb_iv_set(s_obj,"@dbtype",INT2FIX(dbtype));

  switch(dbtype) {

#define hs_int(field)			      \
  rb_iv_set(s_obj,"@" #field,INT2FIX(su.hstat->field))

  case DB_HASH:
    hs_int(hash_magic);
    hs_int(hash_version);		/* Version number. */
    hs_int(hash_metaflags);	/* Metadata flags. */
    hs_int(hash_nkeys);		/* Number of unique keys. */
    hs_int(hash_ndata);		/* Number of data items. */
    hs_int(hash_pagesize);	/* Page size. */
    hs_int(hash_ffactor);		/* Fill factor specified at create. */
    hs_int(hash_buckets);		/* Number of hash buckets. */
    hs_int(hash_free);		/* Pages on the free list. */
    hs_int(hash_bfree);		/* Bytes free on bucket pages. */
    hs_int(hash_bigpages);	/* Number of big key/data pages. */
    hs_int(hash_big_bfree);	/* Bytes free on big item pages. */
    hs_int(hash_overflows);	/* Number of overflow pages. */
    hs_int(hash_ovfl_free);	/* Bytes free on ovfl pages. */
    hs_int(hash_dup);		/* Number of dup pages. */
    hs_int(hash_dup_free);	/* Bytes free on duplicate pages. */
    break;

#define bs_int(field)					\
    rb_iv_set(s_obj,"@" #field,INT2FIX(su.bstat->field))
    
  case DB_BTREE:
  case DB_RECNO:
    bs_int(bt_magic);		/* Magic number. */
    bs_int(bt_version);		/* Version number. */
    bs_int(bt_metaflags);		/* Metadata flags. */
    bs_int(bt_nkeys);		/* Number of unique keys. */
    bs_int(bt_ndata);		/* Number of data items. */
    bs_int(bt_pagesize);		/* Page size. */
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR < 4
    bs_int(bt_maxkey);		/* Maxkey value. */
#endif
    bs_int(bt_minkey);		/* Minkey value. */
    bs_int(bt_re_len);		/* Fixed-length record length. */
    bs_int(bt_re_pad);		/* Fixed-length record pad. */
    bs_int(bt_levels);		/* Tree levels. */
    bs_int(bt_int_pg);		/* Internal pages. */
    bs_int(bt_leaf_pg);		/* Leaf pages. */
    bs_int(bt_dup_pg);		/* Duplicate pages. */
    bs_int(bt_over_pg);		/* Overflow pages. */
#if DB_VERSION_MAJOR == 5 || DB_VERSION_MINOR > 2
    bs_int(bt_empty_pg);		/* Empty pages. */
#endif
    bs_int(bt_free);		/* Pages on the free list. */
    bs_int(bt_int_pgfree);	/* Bytes free in internal pages. */
    bs_int(bt_leaf_pgfree);	/* Bytes free in leaf pages. */
    bs_int(bt_dup_pgfree);	/* Bytes free in duplicate pages. */
    bs_int(bt_over_pgfree);	/* Bytes free in overflow pages. */
    
    break;

#define qs_int(field)					\
    rb_iv_set(s_obj,"@" #field,INT2FIX(su.qstat->field))
    
  case DB_QUEUE:
    qs_int(qs_magic);		/* Magic number. */
    qs_int(qs_version);		/* Version number. */
    qs_int(qs_metaflags);		/* Metadata flags. */
    qs_int(qs_nkeys);		/* Number of unique keys. */
    qs_int(qs_ndata);		/* Number of data items. */
    qs_int(qs_pagesize);		/* Page size. */
    qs_int(qs_extentsize);	/* Pages per extent. */
    qs_int(qs_pages);		/* Data pages. */
    qs_int(qs_re_len);		/* Fixed-length record length. */
    qs_int(qs_re_pad);		/* Fixed-length record pad. */
    qs_int(qs_pgfree);		/* Bytes free in data pages. */
    qs_int(qs_first_recno);	/* First not deleted record. */
    qs_int(qs_cur_recno);		/* Next available record number. */
    
    break;
	case DB_UNKNOWN:
		break;
  }

  free(su.stat);
  return s_obj;
}

/*
 * call-seq:
 * stat[name] -> value
 *
 * return status value
 */
VALUE stat_aref(VALUE obj, VALUE vname)
{
  return rb_iv_get(obj,RSTRING_PTR(rb_str_concat(rb_str_new2("@"),vname)));
}

/*
 * call-seq:
 * env.txn_stat -> Bdb::TxnStat
 *
 * get the environment transaction status. Each active
 * transaction will be contained within a Bdb::TxnStat::Active
 * class.
 */
VALUE env_txn_stat(VALUE obj, VALUE vflags)
{
  u_int32_t flags=0;
  int rv;
  t_envh *eh;
  DB_TXN_STAT *statp;
  VALUE s_obj;
  VALUE active;
  int i;

  if ( ! NIL_P(vflags))
    flags=NUM2UINT(vflags);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");

  /* statp will need free() */
  rv=eh->env->txn_stat(eh->env,&statp,flags);
  if ( rv != 0 ) {
    raise_error(rv, "txn_stat: %s",db_strerror(rv));
  }
  
  s_obj=rb_class_new_instance(0,NULL,cTxnStat);
  rb_iv_set(s_obj,"@st_last_ckp",
	    rb_ary_new3(2,
			INT2FIX(statp->st_last_ckp.file),
			INT2FIX(statp->st_last_ckp.offset)) );
  rb_iv_set(s_obj,"@st_time_ckp",
	    rb_time_new(statp->st_time_ckp,0));
  rb_iv_set(s_obj,"@st_last_txnid",
	    INT2FIX(statp->st_last_txnid));
  rb_iv_set(s_obj,"@st_maxtxns",
	    INT2FIX(statp->st_maxtxns));
  rb_iv_set(s_obj,"@st_nactive",
	    INT2FIX(statp->st_nactive));
  rb_iv_set(s_obj,"@st_maxnactive",
	    INT2FIX(statp->st_maxnactive));
  rb_iv_set(s_obj,"@st_nbegins",
	    INT2FIX(statp->st_nbegins));
  rb_iv_set(s_obj,"@st_naborts",
	    INT2FIX(statp->st_naborts));
  rb_iv_set(s_obj,"@st_ncommits",
	    INT2FIX(statp->st_ncommits));
  rb_iv_set(s_obj,"@st_nrestores",
	    INT2FIX(statp->st_nrestores));
  rb_iv_set(s_obj,"@st_regsize",
	    INT2FIX(statp->st_regsize));
  rb_iv_set(s_obj,"@st_region_wait",
	    INT2FIX(statp->st_region_wait));
  rb_iv_set(s_obj,"@st_region_nowait",
	    INT2FIX(statp->st_region_nowait));
  rb_iv_set(s_obj,"@st_txnarray",
	    active=rb_ary_new2(statp->st_nactive));

  for (i=0; i<statp->st_nactive; i++) {
    rb_ary_push(active,env_txn_stat_active(&(statp->st_txnarray[i])));
  }

  free(statp);
  return s_obj;
}

/*
 * call-seq:
 * env.set_timeout(timeout,flags) -> timeout
 *
 * set lock and transaction timeout
 */
VALUE env_set_timeout(VALUE obj, VALUE vtimeout, VALUE vflags)
{
  t_envh *eh;
  u_int32_t flags=0;
  db_timeout_t timeout;
  int rv;

  if ( ! NIL_P(vflags))
    flags=NUM2UINT(vflags);
  timeout=FIX2UINT(vtimeout);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_timeout(eh->env,timeout,flags);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_timeout: %s",db_strerror(rv));
  }

  return vtimeout;
}

/*
 * call-seq:
 * env.get_timeout(flags) -> Fixnum
 *
 * Get current transaction and lock timeout value
 */
VALUE env_get_timeout(VALUE obj, VALUE vflags)
{
  t_envh *eh;
  u_int32_t flags=0;
  db_timeout_t timeout;
  int rv;

  if ( ! NIL_P(vflags))
    flags=NUM2UINT(vflags);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_timeout(eh->env,&timeout,flags);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_timeout: %s",db_strerror(rv));
  }

  return INT2FIX(timeout);
}

/*
 * call-seq:
 * env.set_tx_max(max) -> max
 *
 * Set the maximum number of transactions with the environment
 */
VALUE env_set_tx_max(VALUE obj, VALUE vmax)
{
  t_envh *eh;
  u_int32_t max;
  int rv;

  max=FIX2UINT(vmax);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_tx_max(eh->env,max);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_tx_max: %s",db_strerror(rv));
  }

  return vmax;
}

/*
 * call-seq
 * env.get_tx_max -> Fixnum
 *
 * Get current maximum number of transactions
 */
VALUE env_get_tx_max(VALUE obj)
{
  t_envh *eh;
  u_int32_t max;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_tx_max(eh->env,&max);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_tx_max: %s",db_strerror(rv));
  }

  return INT2FIX(max);
}

/*
 * call-seq:
 * env.mutex_set_max(max) -> max
 *
 * Set the maximum number of mutexes with the environment
 */
VALUE env_mutex_set_max(VALUE obj, VALUE vmax)
{
  t_envh *eh;
  u_int32_t max;
  int rv;

  max=FIX2UINT(vmax);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->mutex_set_max(eh->env,max);
  if ( rv != 0 ) {
    raise_error(rv, "env_mutex_set_max: %s",db_strerror(rv));
  }

  return vmax;
}

/*
 * call-seq
 * env.mutex_get_max -> Fixnum
 *
 * Get current maximum number of mutexes.
 */
VALUE env_mutex_get_max(VALUE obj)
{
  t_envh *eh;
  u_int32_t max;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->mutex_get_max(eh->env,&max);
  if ( rv != 0 ) {
    raise_error(rv, "env_mutex_get_max: %s",db_strerror(rv));
  }

  return INT2FIX(max);
}

/*
 * call-seq:
 * env.set_shm_key(key) -> max
 *
 * Set the shared memory key base
 */
VALUE env_set_shm_key(VALUE obj, VALUE vkey)
{
  t_envh *eh;
  long key;
  int rv;

  key=FIX2UINT(vkey);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_shm_key(eh->env,key);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_shm_key: %s",db_strerror(rv));
  }

  return vkey;
}

/*
 * call-seq
 * env.get_shm_key -> Fixnum
 *
 * Get the current shm key base
 */
VALUE env_get_shm_key(VALUE obj)
{
  t_envh *eh;
  long key;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_shm_key(eh->env,&key);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_shm_key: %s",db_strerror(rv));
  }

  return INT2FIX(key);
}

VALUE env_log_set_config_h(VALUE obj, u_int32_t flags, VALUE onoff) {
	t_envh *eh;
	int rv;
	Data_Get_Struct(obj,t_envh, eh);
	rv=eh->env->log_set_config(eh->env, flags, Qnil != flags && Qfalse != onoff);
	if(rv != 0)
		raise_error(rv, "log_set_config: %s", db_strerror(rv));
	return flags;
}

VALUE env_log_get_config_h( VALUE obj, u_int32_t flags) {
	t_envh *eh;
	int rv, onoff;
	Data_Get_Struct(obj,t_envh, eh);
	rv=eh->env->log_get_config(eh->env, flags, &onoff);
	if(rv != 0)
		raise_error(rv, "log_set_config: %s", db_strerror(rv));
	return onoff ? Qtrue : Qfalse;
}

#define ENV_LOG_CONFIG_FUNCS \
	ENV_LOG_CONFIG_FUNC(direct,DIRECT) \
	ENV_LOG_CONFIG_FUNC(dsync,DSYNC) \
	ENV_LOG_CONFIG_FUNC(auto_remove,AUTO_REMOVE) \
	ENV_LOG_CONFIG_FUNC(in_memory,IN_MEMORY) \
	ENV_LOG_CONFIG_FUNC(zero,ZERO)

#define ENV_LOG_CONFIG_FUNC( name, cnst) \
	VALUE env_log_set_##cnst( VALUE obj, VALUE flags) { \
		return env_log_set_config_h( obj, DB_LOG_##cnst, flags); \
	} \
	VALUE env_log_get_##cnst( VALUE obj, VALUE flags) { \
		return env_log_get_config_h( obj, DB_LOG_##cnst); \
	}
ENV_LOG_CONFIG_FUNCS

VALUE env_log_set_config( VALUE obj, VALUE flags, VALUE onoff) {
	return env_log_set_config_h( obj, NUM2UINT(flags), onoff);
}

/*
 * call-seq:
 * env.set_lk_detect(detect) -> detect
 *
 * Set when lock detector should be run
 */
VALUE env_set_lk_detect(VALUE obj, VALUE vdetect)
{
  t_envh *eh;
  u_int32_t detect;
  int rv;

  detect=NUM2UINT(vdetect);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_lk_detect(eh->env,detect);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_lk_detect: %s",db_strerror(rv));
  }

  return vdetect;
}

/*
 * call-seq:
 * env.get_lk_detect() -> detect
 *
 * Get when lock detector should be run
 */
VALUE env_get_lk_detect(VALUE obj)
{
  t_envh *eh;
  u_int32_t detect;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_lk_detect(eh->env,&detect);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_lk_detect: %s",db_strerror(rv));
  }

  return UINT2NUM(detect);
}

/*
 * call-seq:
 * env.set_lk_max_locks(max) -> max
 *
 * Set the maximum number of locks in the environment
 */
VALUE env_set_lk_max_locks(VALUE obj, VALUE vmax)
{
  t_envh *eh;
  u_int32_t max;
  int rv;

  max=FIX2UINT(vmax);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_lk_max_locks(eh->env,max);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_lk_max_locks: %s",db_strerror(rv));
  }

  return vmax;
}

/*
 * call-seq:
 * env.get_lk_max_locks -> max
 *
 * Get the maximum number of locks in the environment
 */
VALUE env_get_lk_max_locks(VALUE obj)
{
  t_envh *eh;
  u_int32_t max;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_lk_max_locks(eh->env,&max);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_lk_max_locks: %s",db_strerror(rv));
  }

  return UINT2NUM(max);
}

/*
 * call-seq:
 * env.set_lk_max_objects(max) -> max
 *
 * Set the maximum number of locks in the environment
 */
VALUE env_set_lk_max_objects(VALUE obj, VALUE vmax)
{
  t_envh *eh;
  u_int32_t max;
  int rv;

  max=FIX2UINT(vmax);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_lk_max_objects(eh->env,max);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_lk_max_objects: %s",db_strerror(rv));
  }

  return vmax;
}

/*
 * call-seq:
 * env.get_lk_max_objects -> max
 *
 * Get the maximum number of locks in the environment
 */
VALUE env_get_lk_max_objects(VALUE obj)
{
  t_envh *eh;
  u_int32_t max;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_lk_max_objects(eh->env,&max);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_lk_max_objects: %s",db_strerror(rv));
  }

  return UINT2NUM(max);
}

VALUE env_report_stderr(VALUE obj)
{
  t_envh *eh;
  //u_int32_t max;
  //int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  eh->env->set_errfile(eh->env,stderr);

  return Qtrue;
}

/*
 * call-seq:
 * env.set_data_dir(data_dir) -> data_dir
 *
 * set data_dir
 */
VALUE env_set_data_dir(VALUE obj, VALUE vdata_dir)
{
  t_envh *eh;
  const char *data_dir;
  int rv;

  data_dir=StringValueCStr(vdata_dir);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_data_dir(eh->env,data_dir);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_data_dir: %s",db_strerror(rv));
  }

  return vdata_dir;
}

/*
 * call-seq:
 * env.get_data_dir -> [data_dir_1, data_dir_2, ...]
 *
 * get data_dir
 */
VALUE env_get_data_dirs(VALUE obj)
{
  t_envh *eh;
  const char **data_dirs;
  int rv;
	int ln;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_data_dirs(eh->env,&data_dirs);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_data_dir: %s",db_strerror(rv));
  }

	ln = (sizeof (data_dirs))/sizeof(data_dirs[0]);
	VALUE rb_data_dirs = rb_ary_new2(ln);
	int i;
	for (i=0; i<ln; i++) {
		rb_ary_push(rb_data_dirs, rb_str_new2(data_dirs[i]));
	}
	
  return rb_data_dirs;
}

/*
 * call-seq:
 * env.set_lg_dir(lg_dir) -> lg_dir
 *
 * set lg_dir
 */
VALUE env_set_lg_dir(VALUE obj, VALUE vlg_dir)
{
  t_envh *eh;
  const char *lg_dir;
  int rv;

  lg_dir=StringValueCStr(vlg_dir);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_lg_dir(eh->env,lg_dir);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_lg_dir: %s",db_strerror(rv));
  }

  return vlg_dir;
}

/*
 * call-seq:
 * env.get_lg_dir -> lg_dir
 *
 * get lg_dir
 */
VALUE env_get_lg_dir(VALUE obj)
{
  t_envh *eh;
  const char *lg_dir;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_lg_dir(eh->env,&lg_dir);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_lg_dir: %s",db_strerror(rv));
  }

  return rb_str_new2(lg_dir);
}

/*
 * call-seq:
 * env.set_tmp_dir(tmp_dir) -> tmp_dir
 *
 * set tmp_dir
 */
VALUE env_set_tmp_dir(VALUE obj, VALUE vtmp_dir)
{
  t_envh *eh;
  const char *tmp_dir;
  int rv;

  tmp_dir=StringValueCStr(vtmp_dir);

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->set_tmp_dir(eh->env,tmp_dir);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_tmp_dir: %s",db_strerror(rv));
  }

  return vtmp_dir;
}

/*
 * call-seq:
 * env.get_tmp_dir -> tmp_dir
 *
 * get tmp_dir
 */
VALUE env_get_tmp_dir(VALUE obj)
{
  t_envh *eh;
  const char *tmp_dir;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_tmp_dir(eh->env,&tmp_dir);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_tmp_dir: %s",db_strerror(rv));
  }

  return rb_str_new2(tmp_dir);
}

/*
 * call-seq:
 * env.get_home -> home
 *
 * get home
 */
VALUE env_get_home(VALUE obj)
{
  t_envh *eh;
  const char *home;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  if (!eh->env)
    raise(0, "env is closed");
  rv=eh->env->get_home(eh->env,&home);
  if ( rv != 0 ) {
    raise_error(rv, "env_get_home: %s",db_strerror(rv));
  }

  return rb_str_new2(home);
}

/*
 * call-seq:
 * env.set_verbose(which, onoff)
 *
 * set verbose messages on or off
 */
VALUE env_set_verbose(VALUE obj, VALUE which, VALUE onoff)
{
  t_envh *eh;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  rv = eh->env->set_verbose(eh->env, NUM2UINT(which), RTEST(onoff));

  if ( rv != 0 )  raise_error(rv, "env_set_verbose: %s",db_strerror(rv));

  return Qtrue;
}

/*
 * call-seq:
 * env.set_encrypt(passwd) 
 *
 * set encrypt
 */
VALUE env_set_encrypt(VALUE obj, VALUE vpasswd)
{
  t_envh *eh;
  const char *passwd;
  int rv;

  passwd = StringValueCStr(vpasswd);
  Data_Get_Struct(obj, t_envh, eh);
  u_int32_t flags=0x00000001; //DB_ENCRYPT_AES
  
  rv = eh->env->set_encrypt(eh->env, passwd, flags);
  if ( rv != 0 ) {
    raise_error(rv, "env_set_encrypt: %s",db_strerror(rv));
  }

  return vpasswd;
}

/*
 * call-seq:
 * env.rep_priority = int
 *
 * specify how the replication manager will handle acknowledgement of replication messages
 */
VALUE env_rep_set_priority(VALUE obj, VALUE priority)
{
  t_envh *eh;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  rv = eh->env->rep_set_priority(eh->env, NUM2UINT(priority));

  if ( rv != 0 ) raise_error(rv, "env_rep_set_priority: %s", db_strerror(rv));
  return priority;
}

/*
 * call-seq:
 * env.rep_priority -> int
 *
 * returns the replication manager's acknowledgement policy
 */
VALUE env_rep_get_priority(VALUE obj)
{
  t_envh *eh;
  u_int32_t priority;

  Data_Get_Struct(obj,t_envh,eh);
  eh->env->rep_get_priority(eh->env, &priority);

  return INT2NUM(priority);
}

/*
 * call-seq:
 * env.rep_nsites = int
 *
 * specify how the replication manager will handle acknowledgement of replication messages
 */
VALUE env_rep_set_nsites(VALUE obj, VALUE nsites)
{
  t_envh *eh;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  rv = eh->env->rep_set_nsites(eh->env, NUM2UINT(nsites));

  if ( rv != 0 ) raise_error(rv, "env_rep_set_nsites: %s", db_strerror(rv));
  return nsites;
}

/*
 * call-seq:
 * env.rep_nsites -> int
 *
 * returns the replication manager's acknowledgement policy
 */
VALUE env_rep_get_nsites(VALUE obj)
{
  t_envh *eh;
  u_int32_t nsites;

  Data_Get_Struct(obj,t_envh,eh);
  eh->env->rep_get_nsites(eh->env, &nsites);

  return INT2NUM(nsites);
}


/*
 * call-seq:
 * env.repmgr_set_local_site(host, port)
 *
 * specify the local site for the replication manager
 */
VALUE env_repmgr_set_local_site(VALUE obj, VALUE host, VALUE port)
{
  t_envh *eh;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  rv = eh->env->repmgr_set_local_site(eh->env, StringValuePtr(host), NUM2UINT(port), 0);

  if ( rv != 0 ) raise_error(rv, "env_repmgr_set_local_site: %s", db_strerror(rv));
  return Qtrue;
}

/*
 * call-seq:
 * env.repmgr_add_remote_site(host, port)
 *
 * add a remote for the replication manager
 */
VALUE env_repmgr_add_remote_site(VALUE obj, VALUE host, VALUE port)
{
  t_envh *eh;
  int rv;
  int eidp;

  Data_Get_Struct(obj,t_envh,eh);
  rv = eh->env->repmgr_add_remote_site(eh->env, StringValuePtr(host), NUM2UINT(port), &eidp, 0);

  if ( rv != 0 ) raise_error(rv, "env_repmgr_add_remote_site: %s", db_strerror(rv));
  return INT2NUM(eidp);
}

/*
 * call-seq:
 * env.repmgr_ack_policy = int
 *
 * specify how the replication manager will handle acknowledgement of replication messages
 */
VALUE env_repmgr_set_ack_policy(VALUE obj, VALUE policy)
{
  t_envh *eh;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  rv = eh->env->repmgr_set_ack_policy(eh->env, NUM2INT(policy));

  if ( rv != 0 ) raise_error(rv, "env_repmgr_set_ack_policy: %s", db_strerror(rv));
  return policy;
}

/*
 * call-seq:
 * env.repmgr_ack_policy -> int
 *
 * returns the replication manager's acknowledgement policy
 */
VALUE env_repmgr_get_ack_policy(VALUE obj)
{
  t_envh *eh;
  int policy;

  Data_Get_Struct(obj,t_envh,eh);
  eh->env->repmgr_get_ack_policy(eh->env, &policy);

  return INT2NUM(policy);
}

/*
 * call-seq:
 * env.repmgr_start(num_threads, flags)
 *
 * start the replication manager
 */
VALUE env_repmgr_start(VALUE obj, VALUE num_threads, VALUE flags)
{
  t_envh *eh;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  rv = eh->env->repmgr_start(eh->env, NUM2INT(num_threads), NUM2UINT(flags));

  if ( rv != 0 ) raise_error(rv, "env_repmgr_start: %s", db_strerror(rv));
  return Qtrue;
}

/*
 * call-seq:
 * env.repmgr_stat_print
 *
 * prints replication manager stats
 */
VALUE env_repmgr_stat_print(VALUE obj, VALUE flags)
{
  t_envh *eh;
  int rv;

  Data_Get_Struct(obj,t_envh,eh);
  rv = eh->env->repmgr_stat_print(eh->env, NUM2UINT(flags));

  if ( rv != 0 ) raise_error(rv, "env_repmgr_stat_print: %s", db_strerror(rv));
  return Qtrue;
}

VALUE env_set_lg_bsize( VALUE obj, VALUE size) {
	t_envh *eh;
	int rv;
	Data_Get_Struct(obj, t_envh, eh);
	rv = eh->env->set_lg_bsize( eh->env, NUM2UINT( size));
  if ( rv != 0 )
		raise_error(rv, "env_set_lg_bsize: %s", db_strerror(rv));
  return size;
}

VALUE env_get_lg_bsize( VALUE obj) {
	t_envh *eh;
	int rv;
	u_int32_t size;
	Data_Get_Struct( obj, t_envh, eh);
	rv = eh->env->get_lg_bsize( eh->env, &size);
  if ( rv != 0 )
		raise_error(rv, "env_get_lg_bsize: %s", db_strerror(rv));
  return UINT2FIX(size);
}

VALUE env_set_lg_max( VALUE obj, VALUE size) {
	t_envh *eh;
	int rv;
	Data_Get_Struct(obj, t_envh, eh);
	rv = eh->env->set_lg_max( eh->env, NUM2UINT( size));
  if ( rv != 0 )
		raise_error(rv, "env_set_lg_max: %s", db_strerror(rv));
  return size;
}

VALUE env_get_lg_max( VALUE obj) {
	t_envh *eh;
	int rv;
	u_int32_t size;
	Data_Get_Struct( obj, t_envh, eh);
	rv = eh->env->get_lg_max( eh->env, &size);
  if ( rv != 0 )
		raise_error(rv, "env_get_lg_max: %s", db_strerror(rv));
  return UINT2FIX(size);
}

VALUE env_set_lg_regionmax( VALUE obj, VALUE size) {
	t_envh *eh;
	int rv;
	Data_Get_Struct(obj, t_envh, eh);
	rv = eh->env->set_lg_regionmax( eh->env, NUM2UINT( size));
  if ( rv != 0 )
		raise_error(rv, "env_set_lg_regionmax: %s", db_strerror(rv));
  return size;
}

VALUE env_get_lg_regionmax( VALUE obj) {
	t_envh *eh;
	int rv;
	u_int32_t size;
	Data_Get_Struct( obj, t_envh, eh);
	rv = eh->env->get_lg_regionmax( eh->env, &size);
  if ( rv != 0 )
		raise_error(rv, "env_get_lg_regionmax: %s", db_strerror(rv));
  return UINT2FIX(size);
}


static void txn_finish(t_txnh *txn)
{
  if ( RTEST(ruby_debug) )
    rb_warning("%s/%d %s 0x%p",__FILE__,__LINE__,"txn_finish",(void*)txn);

  txn->txn=NULL;
  if (txn->env) {
      rb_ary_delete(txn->env->atxn,txn->self);
      txn->env=NULL;
  }
}

/*
 * call-seq:
 * txn.commit(flags) -> true
 *
 * commit a transaction
 */
VALUE txn_commit(VALUE obj, VALUE vflags)
{
  t_txnh *txn=NULL;
  u_int32_t flags=0;
  int rv;

  if ( ! NIL_P(vflags))
    flags=NUM2UINT(vflags);

  Data_Get_Struct(obj,t_txnh,txn);

  if (!txn->txn) 
    return Qfalse;
  
  rv=txn->txn->commit(txn->txn,flags);
  txn_finish(txn);
  if ( rv != 0 ) {
    raise_error(rv, "txn_commit: %s",db_strerror(rv));
    return Qnil;
  }
  return Qtrue;
}

/*
 * call-seq:
 * txn.abort -> true
 *
 * abort a transaction
 */
VALUE txn_abort(VALUE obj)
{
  t_txnh *txn=NULL;
  int rv;

  Data_Get_Struct(obj,t_txnh,txn);

  if (!txn->txn)
    return Qfalse;

  rv=txn->txn->abort(txn->txn);
  txn_finish(txn);
  if ( rv != 0 ) {
    raise_error(rv, "txn_abort: %s",db_strerror(rv));
    return Qnil;
  }
  return Qtrue;
}

/*
 * call-seq:
 * txn.discard -> true
 *
 * discard a transaction. Since prepare is not yet supported,
 * I don't think this has much value.
 */
VALUE txn_discard(VALUE obj)
{
  t_txnh *txn=NULL;
  int rv;

  Data_Get_Struct(obj,t_txnh,txn);

  if (!txn->txn)
    raise_error(0,"txn is closed");

  rv=txn->txn->discard(txn->txn,NOFLAGS);
  txn_finish(txn);
  if ( rv != 0 ) {
    raise_error(rv, "txn_abort: %s",db_strerror(rv));
    return Qnil;
  }
  return Qtrue;
}

/*
 * call-seq:
 * txn.tid -> Fixnum
 * 
 * return the transaction id, (named tid to not conflict with
 * ruby's id method)
 */
VALUE txn_id(VALUE obj)
{
  t_txnh *txn=NULL;
  int rv;

  Data_Get_Struct(obj,t_txnh,txn);
  if (!txn->txn)
    raise_error(0,"txn is closed");

  rv=txn->txn->id(txn->txn);
  return INT2FIX(rv);
}

/*
 * call-seq:
 * tx.set_timeout(timeout,flags) -> true
 *
 * set transaction lock timeout
 */
VALUE txn_set_timeout(VALUE obj, VALUE vtimeout, VALUE vflags)
{
  t_txnh *txn=NULL;
  db_timeout_t timeout;
  u_int32_t flags=0;
  int rv;

  if ( ! NIL_P(vflags))
    flags=NUM2UINT(vflags);

  if ( ! FIXNUM_P(vtimeout) )
    raise_error(0,"timeout must be a fixed integer");
  timeout=FIX2UINT(vtimeout);

  Data_Get_Struct(obj,t_txnh,txn);

  if (!txn->txn)
    raise_error(0,"txn is closed");

  rv=txn->txn->set_timeout(txn->txn,timeout,flags);
  if ( rv != 0 ) {
    raise_error(rv, "txn_set_timeout: %s",db_strerror(rv));
    return Qnil;
  }
  return Qtrue;
}

/*
 * Document-class: Bdb
 *
 * Ruby library that wraps the Sleepycat Berkeley DB.
 * 
 * Developed against 4.3/4.4. No support for prior versions.
 */

void Init_bdb() {
  fv_call=rb_intern("call");
  fv_uniq=rb_intern("uniq");
  fv_err_new=rb_intern("new");
  fv_err_code=rb_intern("@code");
  fv_err_msg=rb_intern("@message");

  mBdb = rb_define_module("Bdb");

#include "bdb_aux._c"

  cDb = rb_define_class_under(mBdb,"Db", rb_cObject);
  eDbError = rb_define_class_under(mBdb,"DbError",rb_eStandardError);
#define eDbE_create(n,c) eDbE_##c = rb_define_class_under(mBdb, #c, eDbError);
EXCEPTIONS_CREATE

  rb_define_method(eDbError,"initialize",err_initialize,2);
  rb_define_method(eDbError,"code",err_code,0);

  rb_define_const(cDb,"BTREE",INT2FIX((DBTYPE)(DB_BTREE)));
  rb_define_const(cDb,"HASH",INT2FIX((DBTYPE)(DB_HASH)));
  rb_define_const(cDb,"RECNO",INT2FIX((DBTYPE)(DB_RECNO)));
  rb_define_const(cDb,"QUEUE",INT2FIX((DBTYPE)(DB_QUEUE)));
  rb_define_const(cDb,"UNKNOWN",INT2FIX((DBTYPE)(DB_UNKNOWN)));

  rb_define_alloc_func(cDb,db_alloc);
  rb_define_method(cDb,"initialize",db_initialize,0);

  rb_define_method(cDb,"put",db_put,4);
  rb_define_method(cDb,"get",db_get,4);
  rb_define_method(cDb,"pget",db_pget,4);
  rb_define_method(cDb,"del",db_del,3);
  rb_define_method(cDb,"cursor",db_cursor,2);
  rb_define_method(cDb,"associate",db_associate,4);
  rb_define_method(cDb,"btree_compare=",db_btree_compare_set,1);
  rb_define_method(cDb,"re_len=",db_set_re_len,1);
  rb_define_method(cDb,"re_len",db_get_re_len,0);
  rb_define_method(cDb,"flags=",db_flags_set,1);
  rb_define_method(cDb,"flags",db_flags_get,0);
  rb_define_method(cDb,"open",db_open,6);
  rb_define_method(cDb,"close",db_close,1);
  rb_define_method(cDb,"[]",db_aget,1);
  rb_define_method(cDb,"[]=",db_aset,2);
  rb_define_method(cDb,"join",db_join,2);
  rb_define_method(cDb,"get_byteswapped",db_get_byteswapped,0);
  rb_define_method(cDb,"get_type",db_get_type,0);
  rb_define_method(cDb,"remove",db_remove,3);
  rb_define_method(cDb,"key_range",db_key_range,3);
  rb_define_method(cDb,"rename",db_rename,4);
  rb_define_method(cDb,"pagesize",db_pagesize,0);
  rb_define_method(cDb,"pagesize=",db_pagesize_set,1);
  rb_define_method(cDb,"h_ffactor",db_h_ffactor,0);
  rb_define_method(cDb,"h_ffactor=",db_h_ffactor_set,1);
  rb_define_method(cDb,"h_nelem",db_h_nelem,0);
  rb_define_method(cDb,"h_nelem=",db_h_nelem_set,1);
  rb_define_method(cDb,"stat",db_stat,2);
  cDbStat = rb_define_class_under(cDb,"Stat",rb_cObject);
  rb_define_method(cDbStat,"[]",stat_aref,1);

  rb_define_method(cDb,"sync",db_sync,0);
  rb_define_method(cDb,"truncate",db_truncate,1);

  rb_define_method(cDb,"encrypt=",db_set_encrypt,1);

#if DB_VERSION_MAJOR == 5 || DB_VERSION_MINOR > 3
  rb_define_method(cDb,"compact",db_compact,5);
#endif

  cCursor = rb_define_class_under(cDb,"Cursor",rb_cObject);
  rb_define_method(cCursor,"get",dbc_get,3);
  rb_define_method(cCursor,"pget",dbc_pget,3);
  rb_define_method(cCursor,"put",dbc_put,3);
  rb_define_method(cCursor,"close",dbc_close,0);
  rb_define_method(cCursor,"del",dbc_del,0);
  rb_define_method(cCursor,"count",dbc_count,0);

  cEnv = rb_define_class_under(mBdb,"Env",rb_cObject);
  rb_define_singleton_method(cEnv,"new",env_new,1);
  rb_define_method(cEnv,"open",env_open,3);
  rb_define_method(cEnv,"close",env_close,0);
  rb_define_method(cEnv,"db",env_db,0);
  rb_define_method(cEnv,"cachesize=",env_set_cachesize,1);
  rb_define_method(cEnv,"cachesize",env_get_cachesize,0);
  rb_define_method(cEnv,"flags",env_get_flags,0);
  rb_define_method(cEnv,"flags_on=",env_set_flags_on,1);
  rb_define_method(cEnv,"flags_off=",env_set_flags_off,1);
  rb_define_method(cEnv,"list_dbs",env_list_dbs,0);
  rb_define_method(cEnv,"txn_begin",env_txn_begin,2);
  rb_define_method(cEnv,"txn_checkpoint",env_txn_checkpoint,3);
  rb_define_method(cEnv,"txn_stat",env_txn_stat,1);
	rb_define_method(cEnv,"timeout",env_set_timeout,2);
	rb_define_method(cEnv,"timeout",env_get_timeout,1);
  rb_define_method(cEnv,"mutex_get_max",env_mutex_get_max,0);
  rb_define_method(cEnv,"mutex_set_max",env_mutex_set_max,1);
  rb_define_method(cEnv,"tx_max=",env_set_tx_max,1);
  rb_define_method(cEnv,"tx_max",env_get_tx_max,0);
  rb_define_method(cEnv,"report_stderr",env_report_stderr,0);
  rb_define_method(cEnv,"lk_detect=",env_set_lk_detect,1);
  rb_define_method(cEnv,"lk_detect",env_get_lk_detect,0);
  rb_define_method(cEnv,"lk_max_locks=",env_set_lk_max_locks,1);
  rb_define_method(cEnv,"lk_max_locks",env_get_lk_max_locks,0);
  rb_define_method(cEnv,"lk_max_objects=",env_set_lk_max_objects,1);
  rb_define_method(cEnv,"lk_max_objects",env_get_lk_max_objects,0);
  rb_define_method(cEnv,"shm_key=",env_set_shm_key,1);
  rb_define_method(cEnv,"shm_key",env_get_shm_key,0);
	rb_define_method(cEnv,"log_config",env_log_set_config,2);
#define ENV_LOG_CONFIG_FUNC(name,cnst) \
	rb_define_method(cEnv,"log_"#name "=",env_log_set_##cnst,1); \
	rb_define_method(cEnv,"log_"#name,env_log_get_##cnst,0);
	ENV_LOG_CONFIG_FUNCS

  rb_define_method(cEnv,"data_dir=",env_set_data_dir,1);
  rb_define_method(cEnv,"data_dirs",env_get_data_dirs,0);
	rb_define_method(cEnv,"lg_dir=",env_set_lg_dir,1);
  rb_define_method(cEnv,"lg_dir",env_get_lg_dir,0);
  rb_define_method(cEnv,"tmp_dir=",env_set_tmp_dir,1);
  rb_define_method(cEnv,"tmp_dir",env_get_tmp_dir,0);
  rb_define_method(cEnv,"home",env_get_home,0);
  rb_define_method(cEnv,"set_verbose",env_set_verbose,2);

  rb_define_method(cEnv,"encrypt=",env_set_encrypt,1);
  //rb_define_method(cEnv,"encrypt",env_get_encrypt,0);

  rb_define_method(cEnv,"rep_priority=", env_rep_set_priority, 1);
  rb_define_method(cEnv,"rep_priority", env_rep_get_priority, 0);
  rb_define_method(cEnv,"rep_nsites=", env_rep_set_nsites, 1);
  rb_define_method(cEnv,"rep_nsites", env_rep_get_nsites, 0);
  rb_define_method(cEnv,"repmgr_set_local_site", env_repmgr_set_local_site, 2);
  rb_define_method(cEnv,"repmgr_add_remote_site", env_repmgr_add_remote_site, 2);
  rb_define_method(cEnv,"repmgr_ack_policy=", env_repmgr_set_ack_policy, 1);
  rb_define_method(cEnv,"repmgr_ack_policy", env_repmgr_get_ack_policy, 0);
  rb_define_method(cEnv,"repmgr_start", env_repmgr_start, 2);
  rb_define_method(cEnv,"repmgr_stat_print", env_repmgr_stat_print, 1);

	rb_define_method(cEnv,"lg_bsize=", env_set_lg_bsize, 1);
	rb_define_method(cEnv,"lg_bsize", env_get_lg_bsize, 0);
	rb_define_method(cEnv,"lg_max=", env_set_lg_max, 1);
	rb_define_method(cEnv,"lg_max", env_get_lg_max, 0);
	rb_define_method(cEnv,"lg_regionmax=", env_set_lg_regionmax, 1);
	rb_define_method(cEnv,"lg_regionmax", env_get_lg_regionmax, 0);

  cTxnStat = rb_define_class_under(mBdb,"TxnStat",rb_cObject);
  rb_define_method(cTxnStat,"[]",stat_aref,1);

  cTxnStatActive =
    rb_define_class_under(cTxnStat,"Active",rb_cObject);
  rb_define_method(cTxnStatActive,"[]",stat_aref,1);

  cTxn = rb_define_class_under(mBdb,"Txn",rb_cObject);
  rb_define_method(cTxn,"commit",txn_commit,1);
  rb_define_method(cTxn,"abort",txn_abort,0);
  rb_define_method(cTxn,"discard",txn_discard,0);
  rb_define_method(cTxn,"tid",txn_id,0);
  rb_define_method(cTxn,"set_timeout",txn_set_timeout,2);
}
